package com.innovation.ic.im.end.data.listener;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.innovation.ic.b1b.framework.manager.RedisManager;
import com.innovation.ic.b1b.framework.manager.ThreadPoolManager;
import com.innovation.ic.im.end.base.handler.im_erp9.GroupMessageHandler;
import com.innovation.ic.im.end.base.handler.im_erp9.MessageHandler;
import com.innovation.ic.im.end.base.handler.im_erp9.ModelHandler;
import com.innovation.ic.im.end.base.handler.im_erp9.UserGroupMessageHandler;
import com.innovation.ic.im.end.base.model.im_erp9.*;
import com.innovation.ic.im.end.base.pojo.ServiceResult;
import com.innovation.ic.im.end.base.pojo.constant.DatabaseGlobal;
import com.innovation.ic.im.end.base.pojo.constant.DatabaseOperationType;
import com.innovation.ic.im.end.base.pojo.constant.ImErp9TableName;
import com.innovation.ic.im.end.base.pojo.constant.RedisStorage;
import com.innovation.ic.im.end.base.service.im_erp9.*;
import com.innovation.ic.im.end.base.thread.data.*;
import com.innovation.ic.im.end.base.thread.web.UpdateUserGroupMessageThread;
import com.innovation.ic.im.end.base.value.config.KafkaIgnoreHandleHourParamConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;

/**
 * 当出现数据库操作后，调用这个监听器的方法
 */
@Component
@EnableScheduling
public class KafkaListener {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private MessageHandler messageHandler;

    @Autowired
    private GroupMessageHandler groupMessageHandler;

    @Autowired
    private UserGroupMessageHandler userGroupMessageHandler;

    @Resource
    private ModelHandler modelHandler;

    @Resource
    private UserGroupService userGroupService;

    @Resource
    private ChatPairService chatPairService;

    @Resource
    private RefGroupAccountService refGroupAccountService;

    @Resource
    private RefUserGroupAccountService refUserGroupAccountService;

    @Resource
    private KafkaIgnoreHandleHourParamConfig kafkaIgnoreHandleHourParamConfig;

    @Resource
    private AccountService accountService;

    @Resource
    private TreeNodeService treeNodeService;

    @Resource
    private RedisManager redisManager;

    @Resource
    private DialogueService dialogueService;

    @Resource
    private ThreadPoolManager threadPoolManager;

    /**
     * 当出现数据库操作后，调用这个方法
     *
     * @param consumer consumer
     */
    @org.springframework.kafka.annotation.KafkaListener(topics = "im-end")
    public void listen(ConsumerRecord<?, ?> consumer) {
        // 获取配置文件中的忽略处理时间,当前时间内的数据变动会被丢弃,数据将会在ImportRedisDataTask定时任务中导入
        Calendar calendar = Calendar.getInstance();
        int hour = calendar.get(Calendar.HOUR_OF_DAY);
        int ignoreHour = kafkaIgnoreHandleHourParamConfig.getHour();
        if (hour != ignoreHour) {
            String json = (String) consumer.value();
            JSONObject metaDataJSONObject = JSONObject.parseObject(json);
            // 数据库名称
            String database = metaDataJSONObject.getString("database");
            // 表名称
            String table = metaDataJSONObject.getString("table");
            // 操作类型
            String type = metaDataJSONObject.getString("type");
            // 操作数据
            JSONArray dataJSONArray = metaDataJSONObject.getJSONArray("data");

            if (null != dataJSONArray && dataJSONArray.size() > 0) {
                for (int i = 0; i < dataJSONArray.size(); i++) {
                    JSONObject dataJsonObject = dataJSONArray.getJSONObject(i);
                    // im-erp9数据库、im-erp9-test数据库、im-erp9-prod数据库
                    if (DatabaseGlobal.IM_ERP9.equals(database) || DatabaseGlobal.IM_ERP9_TEST.equals(database) || DatabaseGlobal.IM_ERP9_PROD.equals(database)) {
                        // message表
                        if (table.equals(ImErp9TableName.MESSAGE)) {
                            if (type.equals(DatabaseOperationType.INSERT)) {
                                // 打印监听到的数据库变动内容
                                printConsumerContent(consumer);

                                Message message = modelHandler.toMessageFromTableFormat(dataJsonObject);

                                InsertMessageThread insertMessageThread = new InsertMessageThread(messageHandler, message);
                                threadPoolManager.execute(insertMessageThread);

                                // 将用户最近联系人的信息导入redis
                                List<String> userNameList = new ArrayList<>();
                                userNameList.add(message.getFromUserAccount());
                                userNameList.add(message.getToUserAccount());
                                InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userNameList, redisManager, userGroupService);
                                threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                            }
                            if (type.equals(DatabaseOperationType.UPDATE)) {
                                // 打印监听到的数据库变动内容
                                printConsumerContent(consumer);

                                Message message = modelHandler.toMessageFromTableFormat(dataJsonObject);

                                UpdateMessageThread updateMessageThread = new UpdateMessageThread(messageHandler, message);
                                threadPoolManager.execute(updateMessageThread);

                                // 将用户最近联系人的信息导入redis
                                List<String> userNameList = new ArrayList<>();
                                userNameList.add(message.getFromUserAccount());
                                userNameList.add(message.getToUserAccount());
                                InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userNameList, redisManager, userGroupService);
                                threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                            }
                        }

                        // group_message表
                        if (table.equals(ImErp9TableName.GROUP_MESSAGE)) {
                            if (type.equals(DatabaseOperationType.INSERT)) {
                                // 打印监听到的数据库变动内容
                                printConsumerContent(consumer);

                                GroupMessage groupMessage = modelHandler.toGroupMessageFromTableFormat(dataJsonObject);

                                InsertGroupMessageThread insertGroupMessageThread = new InsertGroupMessageThread(groupMessageHandler, groupMessage);
                                threadPoolManager.execute(insertGroupMessageThread);

                                // 查询默认群组人员账号集合,将用户最近联系人的信息导入redis
                                ServiceResult<List<String>> list = refGroupAccountService.getGroupAccounts(groupMessage.getGroupId());
                                InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(list.getResult(), redisManager, userGroupService);
                                threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                            }
                            if (type.equals(DatabaseOperationType.UPDATE)) {
                                // 打印监听到的数据库变动内容
                                printConsumerContent(consumer);

                                GroupMessage groupMessage = modelHandler.toGroupMessageFromTableFormat(dataJsonObject);

                                UpdateGroupMessageThread updateGroupMessageThread = new UpdateGroupMessageThread(groupMessageHandler, groupMessage);
                                threadPoolManager.execute(updateGroupMessageThread);

                                // 查询默认群组人员账号集合,将用户最近联系人的信息导入redis
                                ServiceResult<List<String>> list = refGroupAccountService.getGroupAccounts(groupMessage.getGroupId());
                                InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(list.getResult(), redisManager, userGroupService);
                                threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                            }
                        }

                        // user_group_message
                        if (table.equals(ImErp9TableName.USER_GROUP_MESSAGE)) {
                            if (type.equals(DatabaseOperationType.INSERT)) {
                                // 打印监听到的数据库变动内容
                                printConsumerContent(consumer);

                                UserGroupMessage userGroupMessage = modelHandler.toUserGroupMessageFromTableFormat(dataJsonObject);

                                InsertUserGroupMessageThread insertUserGroupMessageThread = new InsertUserGroupMessageThread(userGroupMessageHandler, userGroupMessage);
                                threadPoolManager.execute(insertUserGroupMessageThread);

                                // 查询自定义群组人员账号集合,将用户最近联系人的信息导入redis
                                ServiceResult<List<String>> userGroupAccounts = refUserGroupAccountService.getUserGroupAccounts(userGroupMessage.getUserGroupId());
                                InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userGroupAccounts.getResult(), redisManager, userGroupService);
                                threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                            }
                            if (type.equals(DatabaseOperationType.UPDATE)) {
                                // 打印监听到的数据库变动内容
                                printConsumerContent(consumer);

                                UserGroupMessage userGroupMessage = modelHandler.toUserGroupMessageFromTableFormat(dataJsonObject);

                                UpdateUserGroupMessageThread updateUserGroupMessageThread = new UpdateUserGroupMessageThread(userGroupMessageHandler, userGroupMessage);
                                threadPoolManager.execute(updateUserGroupMessageThread);

                                // 查询自定义群组人员账号集合,将用户最近联系人的信息导入redis
                                ServiceResult<List<String>> userGroupAccounts = refUserGroupAccountService.getUserGroupAccounts(userGroupMessage.getUserGroupId());
                                InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userGroupAccounts.getResult(), redisManager, userGroupService);
                                threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                            }
                        }

                        // chat_pair
                        if (table.equals(ImErp9TableName.CHAT_PAIR)) {
                            if (type.equals(DatabaseOperationType.UPDATE)) {
                                // 打印监听到的数据库变动内容
                                printConsumerContent(consumer);

                                ChatPair chatPair = modelHandler.toChatPairFromTableFormat(dataJsonObject);

                                // 将用户最近联系人的信息导入redis
                                List<String> userNameList = new ArrayList<>();
                                userNameList.add(chatPair.getFromUserAccount());
                                userNameList.add(chatPair.getToUserAccount());
                                InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userNameList, redisManager, userGroupService);
                                threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                            }
                        }

                        // account
                        if (table.equals(ImErp9TableName.ACCOUNT)) {
                            // 打印监听到的数据库变动内容
                            printConsumerContent(consumer);
                            Account account = modelHandler.toAccountFromTableFormat(dataJsonObject);

                            if (type.equals(DatabaseOperationType.UPDATE)) {
                                // 将用户最近联系人的信息导入redis
                                List<String> userNameList = chatPairService.selectLastContactAccounts(account.getUsername()).getResult();
                                InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userNameList, redisManager, userGroupService);
                                threadPoolManager.execute(insertUserCurrentDataToRedisThread);

                                //先删除redis缓存
                                redisManager.del(RedisStorage.ACCOUNT_BY_ADMINS_ID_PREFIX + account.getId());
                                redisManager.del(RedisStorage.ACCOUNT_BY_ACCOUNT_PREFIX + account.getUsername());
                                redisManager.del(RedisStorage.ACCOUNT_ALL);
                                //更新redis缓存
                                redisManager.set(RedisStorage.ACCOUNT_BY_ADMINS_ID_PREFIX + account.getId(),JSON.toJSONString(account));
                                redisManager.set(RedisStorage.ACCOUNT_BY_ACCOUNT_PREFIX +  account.getUsername(),JSON.toJSONString(account));
                                redisManager.set(RedisStorage.ACCOUNT_ALL,JSON.toJSONString(accountService.findAllOrderByRealNameAsc()));
                            }
                            if (type.equals(DatabaseOperationType.INSERT)) {
                                //先删除redis缓存
                                redisManager.del(RedisStorage.ACCOUNT_BY_ADMINS_ID_PREFIX + account.getId());
                                redisManager.del(RedisStorage.ACCOUNT_BY_ACCOUNT_PREFIX + account.getUsername());
                                redisManager.del(RedisStorage.ACCOUNT_ALL);
                                //更新redis缓存
                                redisManager.set(RedisStorage.ACCOUNT_BY_ADMINS_ID_PREFIX + account.getId(),JSON.toJSONString(account));
                                redisManager.set(RedisStorage.ACCOUNT_BY_ACCOUNT_PREFIX +  account.getUsername(),JSON.toJSONString(account));
                                redisManager.set(RedisStorage.ACCOUNT_ALL,JSON.toJSONString(accountService.findAllOrderByRealNameAsc()));
                            }
                            if (type.equals(DatabaseOperationType.DELETE)) {
                                //先删除redis缓存
                                redisManager.del(RedisStorage.ACCOUNT_BY_ADMINS_ID_PREFIX + account.getId());
                                redisManager.del(RedisStorage.ACCOUNT_BY_ACCOUNT_PREFIX + account.getUsername());
                                redisManager.del(RedisStorage.ACCOUNT_ALL);
                                //更新redis缓存
                                redisManager.set(RedisStorage.ACCOUNT_ALL,JSON.toJSONString(accountService.findAllOrderByRealNameAsc()));
                            }
                        }

                        // ref_group_account_operation
                        if (table.equals(ImErp9TableName.REF_GROUP_ACCOUNT_OPERATION)) {
                            // 打印监听到的数据库变动内容
                            printConsumerContent(consumer);

                            RefGroupAccountOperation refGroupAccountOperation = modelHandler.toRefGroupAccountOperationFromTableFormat(dataJsonObject);

                            // 将用户最近联系人的信息导入redis
                            List<String> userNameList = new ArrayList<>();
                            userNameList.add(refGroupAccountOperation.getUsername());
                            InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userNameList, redisManager, userGroupService);
                            threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                        }

                        // group_message_receiver
                        if (table.equals(ImErp9TableName.GROUP_MESSAGE_RECEIVER)) {
                            // 打印监听到的数据库变动内容
                            printConsumerContent(consumer);

                            GroupMessageReceiver groupMessageReceiver = modelHandler.toGroupMessageReceiverFromTableFormat(dataJsonObject);

                            // 将用户最近联系人的信息导入redis
                            List<String> userNameList = new ArrayList<>();
                            userNameList.add(groupMessageReceiver.getToUserAccount());
                            InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userNameList, redisManager, userGroupService);
                            threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                        }

                        // ref_user_group_account
                        if (table.equals(ImErp9TableName.REF_USER_GROUP_ACCOUNT)) {
                            // 打印监听到的数据库变动内容
                            printConsumerContent(consumer);

                            RefUserGroupAccount refUserGroupAccount = modelHandler.toRefUserGroupAccountFromTableFormat(dataJsonObject);

                            // 将用户最近联系人的信息导入redis
                            List<String> userNameList = new ArrayList<>();
                            userNameList.add(refUserGroupAccount.getUsername());
                            InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userNameList, redisManager, userGroupService);
                            threadPoolManager.execute(insertUserCurrentDataToRedisThread);

                            //删除redis中自定义用户详情和 自定义群组用户数量
                            redisManager.delRedisDataByKeyPrefix(RedisStorage.REF_USER_GROUP_ACCOUNT_DETAIL_PREFIX + refUserGroupAccount.getUserGroupId());
                            redisManager.del(RedisStorage.REF_USER_GROUP_ACCOUNT_BY_GROUP_ID_COUNT_PREFIX + refUserGroupAccount.getUserGroupId());
                            //更新自定义用户详情和 自定义群组用户数量
                            refUserGroupAccountService.importRefUserGroupAccountIntoRedis( refUserGroupAccount.getUserGroupId());

                        }
                        //ref_group_account
                        if (table.equals(ImErp9TableName.REF_GROUP_ACCOUNT)){
                            // 打印监听到的数据库变动内容
                            printConsumerContent(consumer);
                            RefGroupAccount refGroupAccount = modelHandler.toRefGroupAccountFromTableFormat(dataJsonObject);
                            //添加或删除需要更新群组人数
                            if (type.equals(DatabaseOperationType.INSERT) || type.equals(DatabaseOperationType.DELETE)) {
                                //先删除
                                redisManager.del(RedisStorage.REF_GROUP_ACCOUNT_BY_GROUP_ID_COUNT_PREFIX + refGroupAccount.getGroupId());
                                //后更新
                                ServiceResult<Integer> countByGroupId = refGroupAccountService.getCountByGroupId(refGroupAccount.getGroupId());
                                redisManager.set(RedisStorage.REF_GROUP_ACCOUNT_BY_GROUP_ID_COUNT_PREFIX + refGroupAccount.getGroupId(),JSON.toJSONString(countByGroupId.getResult()));
                            }
                        }

                        // user_group_message_receiver
                        if (table.equals(ImErp9TableName.USER_GROUP_MESSAGE_RECEIVER)) {
                            // 打印监听到的数据库变动内容
                            printConsumerContent(consumer);

                            UserGroupMessageReceiver userGroupMessageReceiver = modelHandler.toUserGroupMessageReceiverFromTableFormat(dataJsonObject);

                            // 将用户最近联系人的信息导入redis
                            List<String> userNameList = new ArrayList<>();
                            userNameList.add(userGroupMessageReceiver.getToUserAccount());
                            InsertUserCurrentDataToRedisThread insertUserCurrentDataToRedisThread = new InsertUserCurrentDataToRedisThread(userNameList, redisManager, userGroupService);
                            threadPoolManager.execute(insertUserCurrentDataToRedisThread);
                        }
                        //tree_node
                        if (table.equals(ImErp9TableName.TREE_NODE)) {
                            // 打印监听到的数据库变动内容
                            printConsumerContent(consumer);

                            TreeNode treeNode = modelHandler.toTreeNodeFromTableFormat(dataJsonObject);
                            //新增和修改对于Redis是先删除后新增
                            if (type.equals(DatabaseOperationType.INSERT) || type.equals(DatabaseOperationType.UPDATE)) {
                                // 删除redis中删除redis中api:v1:treeNode:get_+treeNode.getId():开头的数据
                                Boolean deleteResult = redisManager.delRedisDataByKeyPrefix(RedisStorage.TREE_NODE_PREFIX + treeNode.getId());

                                // 删除后更新到Redis中
                                ServiceResult<List<Account>> listServiceResult = accountService.queryAll();
                                List<Account> result = listServiceResult.getResult();
                                TreeNodeIntoRedisThread treeNodeIntoRedisThread = new TreeNodeIntoRedisThread(treeNode.getId(), result, treeNodeService, redisManager);
                                threadPoolManager.execute(treeNodeIntoRedisThread);
                            }
                            //删除清空redis
                            if (type.equals(DatabaseOperationType.DELETE)) {
                                // 删除redis中删除redis中api:v1:treeNode:get_+treeNode.getId():开头的数据
                                Boolean deleteResult = redisManager.delRedisDataByKeyPrefix(RedisStorage.TREE_NODE_PREFIX + treeNode.getId());
                            }
                        }
                    }
                }
            }
        }
    }

    /**
     * 打印监听到的数据库变动内容
     *
     * @param consumer consumer
     */
    private void printConsumerContent(ConsumerRecord<?, ?> consumer) {
        logger.info("topic【" + consumer.topic() + "】，key【" + consumer.key() + "】，" +
                "分区位置【" + consumer.partition() + "】，下标【" + consumer.offset() + "】，" +
                "value【" + consumer.value() + "】");
    }
}